---
title: 'Create sink consumer'
api: 'POST /sinks'
---

import SinkConsumerResponseSnippet from '/snippets/sink-consumer-response-snippet.mdx'
import SinkConsumerResponseExample from '/snippets/sink-consumer-single-response-example-snippet.mdx'

Creates a new sink consumer.

## Request fields

<ParamField body="name" type="string" required>
  The name of the sink consumer
</ParamField>

<ParamField body="status" type="string" required>
  The initial status of the sink consumer (active, disabled, paused)
</ParamField>

<ParamField body="source" type="object" required>
  The source configuration for the sink consumer
  <Expandable title="source properties">
    <ParamField body="include_schemas" type="array">
      List of schemas to include, or null to include all
    </ParamField>
    <ParamField body="exclude_schemas" type="array">
      List of schemas to exclude, or null to exclude none
    </ParamField>
    <ParamField body="include_tables" type="array">
      List of tables to include, or null to include all

      A table name can be specified with schema (e.g. `my-schema.my-table`) or without (e.g. `my-table`) which defaults to schema `public`.
    </ParamField>
    <ParamField body="exclude_tables" type="array">
      List of tables to exclude, or null to exclude none

      A table name can be specified with schema (e.g. `my-schema.my-table`) or without (e.g. `my-table`) which defaults to schema `public`.
    </ParamField>
  </Expandable>
</ParamField>

<ParamField body="tables" type="array">
  Additional configuration for individual tables.

  To configure which tables are consumed, see the `source` field.

  <Expandable title="tables properties">
    <ParamField body="name" type="string" required>
      The name of the table.

      A table name can be specified with schema (e.g. `my-schema.my-table`) or without (e.g. `my-table`) which defaults to schema `public`.
    </ParamField>
    <ParamField body="group_column_names" type="array">
      The column names to group by or `null` to disable grouping
    </ParamField>
  </Expandable>
</ParamField>

<ParamField body="actions" type="array" required>
  The database actions to include in the sink (insert, update, delete)
</ParamField>

<ParamField body="destination" type="object" required>
  The destination configuration for the sink consumer. The shape varies by destination type.
  <Expandable title="Kafka">
    <ParamField body="type" type="string" required>
      Must be "kafka"
    </ParamField>
    <ParamField body="hosts" type="string" required>
      Comma-separated list of Kafka hosts
    </ParamField>
    <ParamField body="tls" type="boolean">
      Whether to use TLS
    </ParamField>
    <ParamField body="topic" type="string" required>
      Kafka topic name
    </ParamField>
    <ParamField body="username" type="string">
      Optional username for authentication
    </ParamField>
    <ParamField body="password" type="string">
      Optional password for authentication
    </ParamField>
    <ParamField body="sasl_mechanism" type="string">
      Optional SASL mechanism (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM)
    </ParamField>
    <ParamField body="aws_region" type="string">
      AWS region (required for AWS_MSK_IAM)
    </ParamField>
    <ParamField body="aws_access_key_id" type="string">
      AWS access key ID (required for AWS_MSK_IAM)
    </ParamField>
    <ParamField body="aws_secret_access_key" type="string">
      AWS secret access key (required for AWS_MSK_IAM)
    </ParamField>
  </Expandable>

  <Expandable title="SQS">
    <ParamField body="type" type="string" required>
      Must be "sqs"
    </ParamField>
    <ParamField body="queue_url" type="string" required>
      SQS queue URL
    </ParamField>
    <ParamField body="region" type="string" required>
      AWS region
    </ParamField>
    <ParamField body="access_key_id" type="string" required>
      AWS access key ID
    </ParamField>
    <ParamField body="secret_access_key" type="string" required>
      AWS secret access key
    </ParamField>
    <ParamField body="is_fifo" type="boolean">
      Whether the queue is a FIFO queue
    </ParamField>
  </Expandable>

  <Expandable title="Kinesis">
    <ParamField body="type" type="string" required>
      Must be "kinesis"
    </ParamField>
    <ParamField body="stream_arn" type="string" required>
      Kinesis stream ARN
    </ParamField>
    <ParamField body="access_key_id" type="string" required>
      AWS access key ID
    </ParamField>
    <ParamField body="secret_access_key" type="string" required>
      AWS secret access key
    </ParamField>
  </Expandable>

  <Expandable title="RabbitMQ">
    <ParamField body="type" type="string" required>
      Must be "rabbitmq"
    </ParamField>
    <ParamField body="host" type="string" required>
      RabbitMQ host
    </ParamField>
    <ParamField body="port" type="integer" required>
      RabbitMQ port
    </ParamField>
    <ParamField body="username" type="string" required>
      Username for authentication
    </ParamField>
    <ParamField body="password" type="string" required>
      Password for authentication
    </ParamField>
    <ParamField body="virtual_host" type="string" required>
      Virtual host name (default: "/")
    </ParamField>
    <ParamField body="tls" type="boolean">
      Whether to use TLS
    </ParamField>
    <ParamField body="exchange" type="string" required>
      Exchange name
    </ParamField>
  </Expandable>

  <Expandable title="Redis Stream">
    <ParamField body="type" type="string" required>
      Must be "redis_stream"
    </ParamField>
    <ParamField body="host" type="string" required>
      Redis host
    </ParamField>
    <ParamField body="port" type="integer" required>
      Redis port
    </ParamField>
    <ParamField body="stream_key" type="string" required>
      Redis stream key
    </ParamField>
    <ParamField body="database" type="integer">
      Redis database number (default: 0)
    </ParamField>
    <ParamField body="tls" type="boolean">
      Whether to use TLS
    </ParamField>
    <ParamField body="username" type="string">
      Optional username for authentication
    </ParamField>
    <ParamField body="password" type="string">
      Optional password for authentication
    </ParamField>
  </Expandable>

  <Expandable title="Redis String">
    <ParamField body="type" type="string" required>
      Must be "redis_string"
    </ParamField>
    <ParamField body="host" type="string" required>
      Redis host
    </ParamField>
    <ParamField body="port" type="integer" required>
      Redis port
    </ParamField>
    <ParamField body="database" type="integer">
      Redis database number (default: 0)
    </ParamField>
    <ParamField body="tls" type="boolean">
      Whether to use TLS
    </ParamField>
    <ParamField body="username" type="string">
      Optional username for authentication
    </ParamField>
    <ParamField body="password" type="string">
      Optional password for authentication
    </ParamField>
    <ParamField body="expire_ms" type="integer">
      Optional TTL in milliseconds for the keys
    </ParamField>
    <ParamField body="mode" type="string">
      Mode of operation (static or dynamic, default: static)
    </ParamField>
  </Expandable>

  <Expandable title="Azure Event Hub">
    <ParamField body="type" type="string" required>
      Must be "azure_event_hub"
    </ParamField>
    <ParamField body="namespace" type="string" required>
      Event Hub namespace
    </ParamField>
    <ParamField body="event_hub_name" type="string" required>
      Event Hub name
    </ParamField>
    <ParamField body="shared_access_key_name" type="string" required>
      Shared access key name
    </ParamField>
    <ParamField body="shared_access_key" type="string" required>
      Shared access key
    </ParamField>
  </Expandable>

  <Expandable title="NATS">
    <ParamField body="type" type="string" required>
      Must be "nats"
    </ParamField>
    <ParamField body="host" type="string" required>
      NATS host
    </ParamField>
    <ParamField body="port" type="integer" required>
      NATS port
    </ParamField>
    <ParamField body="username" type="string">
      Optional username for authentication
    </ParamField>
    <ParamField body="password" type="string">
      Optional password for authentication
    </ParamField>
    <ParamField body="jwt" type="string">
      Optional JWT for authentication
    </ParamField>
    <ParamField body="nkey_seed" type="string">
      Optional NKey seed for authentication
    </ParamField>
    <ParamField body="tls" type="boolean">
      Whether to use TLS
    </ParamField>
  </Expandable>

  <Expandable title="GCP PubSub">
    <ParamField body="type" type="string" required>
      Must be "gcp_pubsub"
    </ParamField>
    <ParamField body="project_id" type="string" required>
      GCP project ID
    </ParamField>
    <ParamField body="topic_id" type="string" required>
      PubSub topic ID
    </ParamField>
    <ParamField body="credentials" type="string" required>
      Base64-encoded credentials
    </ParamField>
    <ParamField body="use_emulator" type="boolean">
      Whether to use the emulator
    </ParamField>
    <ParamField body="emulator_base_url" type="string">
      Emulator base URL if using emulator
    </ParamField>
  </Expandable>

  <Expandable title="Elasticsearch">
    <ParamField body="type" type="string" required>
      Must be "elasticsearch"
    </ParamField>
    <ParamField body="endpoint_url" type="string" required>
      Elasticsearch endpoint URL
    </ParamField>
    <ParamField body="index_name" type="string" required>
      Elasticsearch index name
    </ParamField>
    <ParamField body="auth_type" type="string" required>
      Authentication type (api_key, basic, bearer)
    </ParamField>
    <ParamField body="auth_value" type="string" required>
      Authentication value, either an API key or a base64-encoded string of a username and password
    </ParamField>
    <ParamField body="batch_size" type="integer">
      Number of documents to batch together (default: 100)
    </ParamField>
  </Expandable>

  <Expandable title="Meilisearch">
    <ParamField body="type" type="string" required>
      Must be "meilisearch"
    </ParamField>
    <ParamField body="endpoint_url" type="string" required>
      Meilisearch endpoint URL
    </ParamField>
    <ParamField body="index_name" type="string" required>
      Meilisearch index name
    </ParamField>
    <ParamField body="primary_key" type="string">
      Primary key field name (default: "id")
    </ParamField>
    <ParamField body="api_key" type="string" required>
      Meilisearch API key
    </ParamField>
    <ParamField body="batch_size" type="integer">
      Number of documents to batch together (default: 100)
    </ParamField>
    <ParamField body="timeout_seconds" type="integer">
      Request timeout in seconds (default: 5)
    </ParamField>
  </Expandable>

  <Expandable title="S2">
    <ParamField body="type" type="string" required>
      Must be "s2"
    </ParamField>
    <ParamField body="basin" type="string" required>
      The name of the S2 basin
    </ParamField>
    <ParamField body="stream" type="string" required>
      The name of the S2 stream
    </ParamField>
    <ParamField body="access_token" type="string" required>
      The access token for the S2 account associated with the basin and stream
    </ParamField>
  </Expandable>

  <Expandable title="Sequin Stream">
    <ParamField body="type" type="string" required>
      Must be "sequin_stream"
    </ParamField>
  </Expandable>

  <Expandable title="SNS">
    <ParamField body="type" type="string" required>
      Must be "sns"
    </ParamField>
    <ParamField body="topic_arn" type="string" required>
      SNS topic ARN
    </ParamField>
    <ParamField body="region" type="string" required>
      AWS region
    </ParamField>
    <ParamField body="access_key_id" type="string" required>
      AWS access key ID
    </ParamField>
    <ParamField body="secret_access_key" type="string" required>
      AWS secret access key
    </ParamField>
    <ParamField body="is_fifo" type="boolean">
      Whether the topic is a FIFO topic
    </ParamField>
  </Expandable>

  <Expandable title="Typesense">
    <ParamField body="type" type="string" required>
      Must be "typesense"
    </ParamField>
    <ParamField body="endpoint_url" type="string" required>
      Typesense endpoint URL
    </ParamField>
    <ParamField body="collection_name" type="string" required>
      Typesense collection name
    </ParamField>
    <ParamField body="api_key" type="string" required>
      Typesense API key
    </ParamField>
    <ParamField body="batch_size" type="integer">
      Number of documents to batch together (default: 100)
    </ParamField>
    <ParamField body="timeout_seconds" type="integer">
      Request timeout in seconds (default: 5)
    </ParamField>
  </Expandable>

  <Expandable title="Webhook">
    <ParamField body="type" type="string" required>
      Must be "webhook"
    </ParamField>
    <ParamField body="http_endpoint" type="string" required>
      HTTP endpoint name
    </ParamField>
    <ParamField body="http_endpoint_path" type="string" required>
      HTTP endpoint path
    </ParamField>
    <ParamField body="batch" type="boolean">
      Whether the webhook body [should be wrapped in a list](/reference/sinks/webhooks#request-format) to support batching. Defaults to `true`.
    </ParamField>
  </Expandable>
</ParamField>

<ParamField body="database" type="string" required>
  The source database for the sink consumer
</ParamField>

<ParamField body="enrichment" type="string">
  The [enrichment function](/reference/enrichment) for the sink consumer
</ParamField>

<ParamField body="filter" type="string">
  The [filter function](/reference/filters) for the sink consumer
</ParamField>

<ParamField body="transform" type="string">
  The [transform function](/reference/transforms) for the sink consumer
</ParamField>

<ParamField body="routing" type="string">
  The [routing function](/reference/routing) for the sink consumer
</ParamField>

<ParamField body="message_grouping" type="boolean">
  Whether message grouping is enabled for ordering purposes. When `true` (default), messages are grouped by primary key. When `false`, grouping is disabled for higher throughput batching.
</ParamField>

<ParamField body="batch_size" type="integer">
  Number of records to batch together (1-1000)
</ParamField>

<ParamField body="max_retry_count" type="integer">
  The maximum number of times a message will be retried if delivery fails. Once this limit is reached, the message will be discarded.
  Defaults to `null`, meaning messages are retried indefinitely.
</ParamField>

<ParamField body="load_shedding_policy" type="string">
  Determines how Sequin handles overload when sink consumers can't keep up with incoming messages.
  Options are:
  - `pause_on_full` (default) — pauses replication until the buffer has room again
  - `discard_on_full` — drops messages for overloaded consumers to avoid pausing replication

  For more details, see [load shedding policy](/reference/sinks/overview#load_shedding_policy).
</ParamField>

<ParamField body="timestamp_format" type="string">
  The format of the timestamp in the source data.
  Possible values include `iso8601` and `unix_microsecond`.
</ParamField>

<ParamField body="annotations" type="object">
  Additional metadata you can attach to this sink consumer. Annotations can be any JSON object.
</ParamField>

## Response fields

<SinkConsumerResponseSnippet />

<RequestExample>
  ```bash cURL
  curl -X POST "https://api.sequinstream.com/api/sinks" \
    -H "Authorization: Bearer YOUR_API_TOKEN" \
    -H "Content-Type: application/json" \
    -d '{
      "name": "kafka-ids",
      "status": "active",
      "source": {
        "include_schemas": null,
        "exclude_schemas": null,
        "include_tables": null,
        "exclude_tables": null
      },
      "actions": ["insert", "update", "delete"],
      "destination": {
        "type": "kafka",
        "hosts": "localhost:9092",
        "tls": false,
        "topic": "records"
      },
      "database": "dune",
      "filter": "",
      "transform": "id-transform",
      "routing": "",
      "message_grouping": true,
      "batch_size": 100,
      "max_retry_count": null,
      "load_shedding_policy": "pause_on_full",
      "timestamp_format": "iso8601",
      "annotations": {}
    }'
  ```
</RequestExample>

<SinkConsumerResponseExample />